iT邦幫忙

2025 iThome 鐵人賽

DAY 21
0

前言

我們今天探討的狀況為另一個應用場景,在實際狀態,我們會面臨不同推理的需求,有些需要及時回應用戶請求
今天探討的主題為即時推理 Real-time Inference 以及批次處理 Batch Processing,根據實際專案選擇不同的方案

即時推理(Real-time Inference) vs 批次處理(Batch Processing):核心差異

即時推理(Real-time Inference)

  • 低延遲要求(通常在毫秒到秒級)
  • 單筆或小批次請求
  • 持續運行的端點
  • 適合互動式應用

典型應用場景

  • 聊天機器人即時回應
  • 圖像識別 API
  • 推薦系統即時推薦
  • 欺詐檢測系統
  • 語音轉文字服務

批次處理(Batch Processing)

  • 可容忍較高延遲(分鐘到小時級)
  • 大量資料批次處理
  • 按需執行,處理完即釋放資源
  • 成本效益高

典型應用場景

  • 定期的資料分析報告
  • 大規模圖像預處理
  • 離線推薦系統更新
  • 批次文字翻譯
  • 歷史資料重新評分

SageMaker 即時推理架構

  1. 標準即時端點部署 :
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFaceModel

# 初始化
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# 配置 HuggingFace 模型
huggingface_model = HuggingFaceModel(
    model_data='s3://your-bucket/model.tar.gz',
    role=role,
    transformers_version='4.26',
    pytorch_version='1.13',
    py_version='py39',
    env={
        'HF_TASK': 'text-classification',
        'HF_MODEL_ID': 'distilbert-base-uncased'
    }
)

# 部署即時端點
predictor = huggingface_model.deploy(
    initial_instance_count=2,
    instance_type='ml.m5.xlarge',
    endpoint_name='sentiment-analysis-endpoint'
)

# 進行即時推理
result = predictor.predict({
    'inputs': "This product is amazing!"
})
print(result)

  1. 自動擴展(Auto Scaling)配置

(根據流量需求)

import boto3

# 初始化 Auto Scaling client
autoscaling_client = boto3.client('application-autoscaling')

# 註冊可擴展目標
response = autoscaling_client.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId=f'endpoint/sentiment-analysis-endpoint/variant/AllTraffic',
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,
    MaxCapacity=10
)

# 配置目標追蹤擴展政策
autoscaling_client.put_scaling_policy(
    PolicyName='SageMakerEndpointInvocationScalingPolicy',
    ServiceNamespace='sagemaker',
    ResourceId=f'endpoint/sentiment-analysis-endpoint/variant/AllTraffic',
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 750.0,  # 目標每分鐘調用次數
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
        },
        'ScaleInCooldown': 300,
        'ScaleOutCooldown': 60
    }
)

  1. cache 提升效能

(針對重複請求我們增加 Cache 層處理)

import json
import hashlib
from datetime import datetime, timedelta

class CachedPredictor:
    def __init__(self, predictor, cache_ttl_seconds=300):
        self.predictor = predictor
        self.cache = {}
        self.cache_ttl = timedelta(seconds=cache_ttl_seconds)
    
    def _get_cache_key(self, input_data):
        """生成快取鍵值"""
        input_str = json.dumps(input_data, sort_keys=True)
        return hashlib.md5(input_str.encode()).hexdigest()
    
    def predict(self, input_data):
        """帶快取的推理"""
        cache_key = self._get_cache_key(input_data)
        
        # 檢查快取
        if cache_key in self.cache:
            cached_result, timestamp = self.cache[cache_key]
            if datetime.now() - timestamp < self.cache_ttl:
                print(f"Cache hit for key: {cache_key}")
                return cached_result
        
        # 快取未命中,進行實際推理
        result = self.predictor.predict(input_data)
        self.cache[cache_key] = (result, datetime.now())
        
        return result

# ex
cached_predictor = CachedPredictor(predictor, cache_ttl_seconds=600)
result = cached_predictor.predict({'inputs': "This is great!"})

SageMaker 批次轉換架構

  1. 批次基本作業

(處理大量離線資料)

from sagemaker.huggingface import HuggingFaceModel

# 使用已訓練的模型
huggingface_model = HuggingFaceModel(
    model_data='s3://your-bucket/model.tar.gz',
    role=role,
    transformers_version='4.26',
    pytorch_version='1.13',
    py_version='py39'
)

# 建立批次轉換器
batch_transformer = huggingface_model.transformer(
    instance_count=3,
    instance_type='ml.m5.xlarge',
    strategy='MultiRecord',
    max_payload=1,  # MB
    max_concurrent_transforms=8,
    output_path='s3://your-bucket/batch-output/'
)

# 執行批次轉換
batch_transformer.transform(
    data='s3://your-bucket/batch-input/',
    content_type='application/json',
    split_type='Line',
    join_source='Input'
)

# 等待完成
batch_transformer.wait()
  1. 準備批次輸入資料

(批次輸入需要特定格式,通常為 JSON Lines)

import json

def prepare_batch_input(texts, output_file):
    """準備批次推理的輸入檔案"""
    with open(output_file, 'w') as f:
        for text in texts:
            input_data = {'inputs': text}
            f.write(json.dumps(input_data) + '\n')

# 範例使用
sample_texts = [
    "This product exceeded my expectations!",
    "Terrible quality, would not recommend.",
    "Average product, nothing special.",
    "Best purchase I've made this year!"
]

prepare_batch_input(sample_texts, 'batch_input.jsonl')

# 上傳到 S3
s3_client = boto3.client('s3')
s3_client.upload_file(
    'batch_input.jsonl',
    'your-bucket',
    'batch-input/batch_input.jsonl'
)
  1. 處理批次輸出
import json

def process_batch_output(output_file):
    """處理批次推理的輸出結果"""
    results = []
    
    with open(output_file, 'r') as f:
        for line in f:
            result = json.loads(line)
            results.append(result)
    
    return results

# 從 S3 下載輸出
s3_client.download_file(
    'your-bucket',
    'batch-output/batch_input.jsonl.out',
    'batch_output.jsonl'
)

# 處理結果
results = process_batch_output('batch_output.jsonl')
for i, result in enumerate(results):
    print(f"Sample {i+1}: {result}")

混合架構: Lambda + SageMaker

結合 AWS Lambda 來建立更靈活的推理架構

import json
import boto3

def lambda_handler(event, context):
    """
    Lambda 函數作為 SageMaker 端點的前端
    處理請求預處理和結果後處理
    """
    runtime = boto3.client('sagemaker-runtime')
    
    # 從事件中提取輸入
    input_text = event.get('text', '')
    
    # 預處理
    payload = {
        'inputs': input_text,
        'parameters': {
            'max_length': 128,
            'temperature': 0.7
        }
    }
    
    try:
        # 調用 SageMaker 端點
        response = runtime.invoke_endpoint(
            EndpointName='sentiment-analysis-endpoint',
            ContentType='application/json',
            Body=json.dumps(payload)
        )
        
        # 解析回應
        result = json.loads(response['Body'].read().decode())
        
        # 後處理
        processed_result = {
            'statusCode': 200,
            'body': json.dumps({
                'input': input_text,
                'prediction': result,
                'timestamp': context.request_id
            })
        }
        
    except Exception as e:
        processed_result = {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e)
            })
        }
    
    return processed_result

進階 : 批次處理優化

  1. 使用 SageMaker Processing 進行大規模預處理

參考這篇

from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn

# 建立處理器
sklearn_processor = ScriptProcessor(
    image_uri='1342143124.dkr.ecr.<你的region>.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
    role=role,
    instance_type='ml.m5.xlarge',
    instance_count=5,
    command=['python3']
)

# 執行批次預處理
sklearn_processor.run(
    code='preprocessing.py',
    inputs=[
        ProcessingInput(
            source='s3://your-bucket/raw-data/',
            destination='/opt/ml/processing/input'
        )
    ],
    outputs=[
        ProcessingOutput(
            source='/opt/ml/processing/output',
            destination='s3://your-bucket/processed-data/'
        )
    ],
    arguments=['--batch-size', '1000']
)
  1. 預處理 script 範例
# preprocessing.py
import argparse
import os
import pandas as pd
from sklearn.preprocessing import StandardScaler
import joblib

def preprocess_batch(input_path, output_path, batch_size):
    """批次預處理資料"""
    
    # 讀取所有輸入檔案
    all_files = [os.path.join(input_path, f) for f in os.listdir(input_path)]
    
    for file in all_files:
        # 分批讀取大檔案
        for chunk in pd.read_csv(file, chunksize=batch_size):
            # 執行預處理
            processed = preprocess_data(chunk)
            
            # 儲存處理後的資料
            output_file = os.path.join(
                output_path,
                f"processed_{os.path.basename(file)}"
            )
            processed.to_csv(output_file, mode='a', index=False)

def preprocess_data(df):
    """實際的預處理邏輯"""
    # 移除缺失值
    df = df.dropna()
    
    # 標準化數值特徵
    numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
    scaler = StandardScaler()
    df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
    
    return df

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--batch-size', type=int, default=1000)
    args = parser.parse_args()
    
    input_path = '/opt/ml/processing/input'
    output_path = '/opt/ml/processing/output'
    
    preprocess_batch(input_path, output_path, args.batch_size)

成本優化

即時推理成本優化

serverless inference 參考資料

# 1. 使用 Serverless Inference 處理間歇性流量
from sagemaker.serverless import ServerlessInferenceConfig

serverless_config = ServerlessInferenceConfig(
    memory_size_in_mb=2048,
    max_concurrency=10
)

predictor = huggingface_model.deploy(
    serverless_inference_config=serverless_config,
    endpoint_name='serverless-sentiment-endpoint'
)

# 2. 使用 Multi-Model Endpoints 共享資源
from sagemaker.multidatamodel import MultiDataModel

mme = MultiDataModel(
    name='multi-model-endpoint',
    model_data_prefix='s3://your-bucket/models/',
    model=huggingface_model,
    sagemaker_session=sagemaker_session
)

mme_predictor = mme.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge'
)

批次處理成本優化

使用 spot instance 降低成本

# 使用 Spot 實例降低成本
batch_transformer = huggingface_model.transformer(
    instance_count=5,
    instance_type='ml.m5.xlarge',
    use_spot_instances=True,
    max_wait_time=7200,  # 2 小時
    max_run_time=3600,   # 1 小時
    output_path='s3://your-bucket/batch-output/'
)

監控效能

使用 cloud watch

import boto3

cloudwatch = boto3.client('cloudwatch')

# 為即時端點建立延遲告警
cloudwatch.put_metric_alarm(
    AlarmName='HighModelLatency',
    ComparisonOperator='GreaterThanThreshold',
    EvaluationPeriods=2,
    MetricName='ModelLatency',
    Namespace='AWS/SageMaker',
    Period=60,
    Statistic='Average',
    Threshold=1000.0,  # 毫秒
    ActionsEnabled=True,
    AlarmActions=['arn:aws:sns:<你的 region>:123456789012:alerts'],
    Dimensions=[
        {
            'Name': 'EndpointName',
            'Value': 'sentiment-analysis-endpoint'
        },
        {
            'Name': 'VariantName',
            'Value': 'AllTraffic'
        }
    ]
)

# 為批次轉換建立失敗告警
cloudwatch.put_metric_alarm(
    AlarmName='BatchTransformFailures',
    ComparisonOperator='GreaterThanThreshold',
    EvaluationPeriods=1,
    MetricName='TransformJobsFailed',
    Namespace='AWS/SageMaker',
    Period=300,
    Statistic='Sum',
    Threshold=0,
    ActionsEnabled=True,
    AlarmActions=['arn:aws:sns:<你的 region>:123456789012:alerts']
)

今日小抱怨

做到 21 天,有點疲乏XD再加把勁!!


上一篇
多模態AI應用:文字+圖像處理
下一篇
監控與日誌分析實作
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言